package com.atguigu.flink.chapter11.window;

import com.atguigu.flink.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

import static org.apache.flink.table.api.Expressions.$;

/**
 * @Author lzc
 * @Date 2022/7/12 11:12
 */
public class Flink02_TVF_1 {
    public static void main(String[] args) throws Exception {
        
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        DataStream<WaterSensor> stream = env
            .fromElements(
                new WaterSensor("sensor_1", 1000L, 10),
                new WaterSensor("sensor_1", 2000L, 20),
                new WaterSensor("sensor_1", 4000L, 40),
                new WaterSensor("sensor_1", 6001L, 60),
                new WaterSensor("sensor_1", 16001L, 60),
                new WaterSensor("sensor_1", 18001L, 60),
                new WaterSensor("sensor_1", 21001L, 60),
                new WaterSensor("sensor_1", 27001L, 60),
                new WaterSensor("sensor_1", 31000L, 60)
            )
            .assignTimestampsAndWatermarks(
                WatermarkStrategy
                    .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                    .withTimestampAssigner((ws, ts) -> ws.getTs())
            );
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        
        Table table = tEnv.fromDataStream(stream, $("id"), $("ts"), $("vc"), $("et").rowtime());
        tEnv.createTemporaryView("sensor", table);
        
        
        /*tEnv
            .sqlQuery("select " +
                          " id, " +
                          " window_start, " +
                          " window_end, " +
                          " sum(vc) vc_sum " +
                          "from table( tumble( table sensor, descriptor(et), interval '5' second )) " +
                          "group by id, window_start, window_end")
            .execute()
            .print();*/
    
        tEnv
            .sqlQuery("select " +
                          " id, " +
                          " window_start, " +
                          " window_end, " +
                          " sum(vc) vc_sum " +
                          "from table( cumulate( table sensor, descriptor(et),  interval '2' second, interval '10' second )) " +
                          "group by id, window_start, window_end")
            .execute()
            .print();
        
    }
}
